#include	"supervisor.h"
#include	"resource.h"
#include	"logger.h"
#include	"watcher.h"

#include	<csignal>
#include	<cstring>
#include	<ctime>
#include	<iostream>
#include	<fstream>
#include	<regex>

#if defined(_WIN32)
#	include	<Windows.h>
#else
#	include	<sys/sysinfo.h>
#	include	<unistd.h>
#endif

#define		HEADER_HTML	"Content-type:text/html"
#define		HEADER_JSON	"Content-type:application/json"

#define		MSG_FORBIDDEN	"<div style=\"text-align:center;margin-top:10%;margin-left:4%;margin-right:4%;\"><h2>403 Forbidden</h2><hr><p></p></div>"
#define		MSG_SUCCESS		"{\"success\": true }"
#define		MSG_BADPARAM	"{\"err\": \"Bad parameters!\"}"
#define		MSG_BADNAME		"{\"err\": \"A command with same name exists already!\"}"
#define		MSG_NEEDSTOP	"{\"err\": \"Need to stop command before editing it!\"}"
#define		MSG_RUNNING		"{\"err\": \"Command is running already!\"}"

void Supervisor::Dispatch(mg_connection * pConn, int nEvent, void * pData) {
  Supervisor * pIns = (Supervisor *)pConn->mgr->user_data;

  switch (nEvent) {
  case MG_EV_ACCEPT:
    if (!pIns->CheckConnect(pConn)) pIns->Response(pConn, 403, HEADER_HTML, MSG_FORBIDDEN);
    break;
  case MG_EV_CLOSE:
    pIns->ResponseClose(pConn);
    break;
  case MG_EV_HTTP_REQUEST: {
    http_message * pMsg = (http_message *)pData;

    if (!pIns->CheckAuth(pConn, pMsg)) {
      pIns->ResponseNeedAuth(pConn);
    } else if (pMsg->uri.len == 1) {
      pIns->ResponseResource(pConn, "res/index.html");
    } else if (pMsg->uri.len > 4 && std::string(pMsg->uri.p, 5) == "/res/") {
      std::string sPath(pMsg->uri.p + 1, pMsg->uri.len - 1);
      pIns->ResponseResource(pConn, sPath);
    } else if (pMsg->uri.len > 4 && std::string(pMsg->uri.p, 5) == "/api/") {
      pIns->ResponseApi(pConn, pMsg);
    } else {
      pIns->Response(pConn, 403, HEADER_HTML, MSG_FORBIDDEN);
    }
    break; }
  case MG_EV_WEBSOCKET_HANDSHAKE_DONE: {
    http_message * pMsg = (http_message *)pData;
    std::string sUrl(pMsg->uri.p, pMsg->uri.len);
    pIns->ResponseWebsocketOpened(pConn, sUrl);
    break; }
  default:
    break;
  }
}

void Supervisor::Start(Command & rCmd) {
  if (rCmd.Has("help")) {
    std::cout
      << "\nSupervisor - System process controller for Unix. Version 1.0\n"
      << "Copyright(c) longshuang@msn.cn 2015-2016. All rights reserved.\n\n"
      << "[Usage]\n"
      << "\thelp\tShow this message.\n"
      << "\tport=N\tSet listen port.\n"
      << "\tdaemon\tRun in daemon mode.\n"
      << std::endl;
    return;
  }

#if defined(_WIN32)
  if (_access("supervisor.json", 0) != 0) SaveSetting();
#else
  if (access("supervisor.json", 0) != 0) SaveSetting();
#endif
  if (!LoadSetting()) return;

  if (!GResource.Load()) {
    LOG_ERR("Supervisor start failed due to load resource!");
    return;
  }

#if !defined(_WIN32)
  signal(SIGPIPE, SIG_IGN);
  signal(SIGKILL, [](int nSig) { GWatcher.StopAll(); exit(0); });
#endif
  signal(SIGINT, [](int nSig) { GWatcher.StopAll(); exit(0); });
  signal(SIGTERM, [](int nSig) { GWatcher.StopAll(); exit(0); });

  mg_mgr_init(&_iMgr, this);

  std::string sPort		= rCmd.Has("port") ? rCmd.Get("port") : "8088";	
  mg_connection * pConn	= mg_bind(&_iMgr, sPort.c_str(), Supervisor::Dispatch);

  if (!pConn) {
    LOG_ERR("Supervisor start failed : mg_bind!");
    return;
  }

  mg_set_protocol_http_websocket(pConn);
  LOG_INFO("Supervisor started on :%s", sPort.data());

  Register("/api/add", EMethod::POST, &Supervisor::AddWatcher);
  Register("/api/edit", EMethod::POST, &Supervisor::EditWatcher);
  Register("/api/reload", EMethod::GET, &Supervisor::Reload);
  Register("/api/start", EMethod::GET, &Supervisor::StartWatcher);
  Register("/api/stop", EMethod::GET, &Supervisor::StopWatcher);
  Register("/api/delete", EMethod::GET, &Supervisor::DeleteWatcher);

  GWatcher.SetNotifier([this](const std::string & sScope, const Json::Value & rMsg) {
    Broadcast(sScope, rMsg.toStyledString());
  });
  
  if (rCmd.Has("daemon")) {
  #if defined(_WIN32)
    HWND hWnd = ::FindWindowA("ConsoleWindowClass", NULL);
    if (hWnd) ::ShowWindow(hWnd, SW_HIDE);
  #else
    daemon(1, 0);
  #endif
  }

  while (true) {
    mg_mgr_poll(&_iMgr, 10);
    GWatcher.Breath();
  }

  mg_mgr_free(&_iMgr);
}

bool Supervisor::CheckConnect(mg_connection * pConn) {
  int nIPCount = _iConf["iptables"].size();
  if (nIPCount <= 0) return true;

  const char * pAddr = inet_ntoa(pConn->sa.sin.sin_addr);
  for (int i = 0; i < nIPCount; ++i) {
    std::regex iMatch(_iConf["iptables"][i].asString(), std::regex::extended);
    if (std::regex_match(pAddr, iMatch)) {
      return true;
    }
  }

  return false;
}

bool Supervisor::CheckAuth(mg_connection * pConn, http_message * pMsg) {
  if (!_pAuth) return true;

  fseek(_pAuth, SEEK_SET, 0);
  return mg_http_check_digest_auth(pMsg, "supervisor", _pAuth) == 1;
}

void Supervisor::ResponseClose(mg_connection *pConn) {
  char * pScope = (char *)pConn->user_data;
  if (pScope) delete[] pScope;

  for (auto it = _vSockets.begin(); it != _vSockets.end(); ++it) {
    if (*it == pConn) {
      _vSockets.erase(it);
      break;
    }
  }
}

void Supervisor::ResponseResource(mg_connection * pConn, const std::string & sPath) {
  Asset * pAsset = GResource.Get(sPath);
  if (!pAsset) {
    Response(pConn, 403, HEADER_HTML, MSG_FORBIDDEN);
  } else {
    std::string sHeader("Content-type:"), sContent(pAsset->pData, pAsset->nSize);
    sHeader.append(pAsset->pType);
    Response(pConn, 200, sHeader, sContent);
  }
}

void Supervisor::ResponseWebsocketOpened(mg_connection *pConn, const std::string & sUrl) {
  char * pScope = new char[sUrl.size() + 1];
  memcpy(pScope, sUrl.data(), sUrl.size());
  pScope[sUrl.size()] = '\0';

  pConn->user_data = pScope;
  _vSockets.push_back(pConn);

  if (sUrl == "/ws") {
    for (size_t i = 0; i < _iConf["watcher"].size(); ++i) {
      Json::Value iWatcher	= _iConf["watcher"][(int)i];
      std::string sName		= iWatcher["name"].asString();
      Watcher::Info * pInfo	= GWatcher.Get(sName);

      Json::Value iMsg;
      iMsg["action"]					= "add";
      iMsg["watcher"]					= iWatcher;
      iMsg["watcher"]["status"]		= pInfo ? pInfo->emStatus : Watcher::EStatus::Stopped;
      iMsg["watcher"]["pid"]			= pInfo ? (int)pInfo->nPid : -1;
      iMsg["watcher"]["start_time"]	= pInfo ? pInfo->GetStartTime() : "----";

      std::string sMsg = iMsg.toStyledString();
      mg_send_websocket_frame(pConn, WEBSOCKET_OP_TEXT, sMsg.data(), sMsg.size());
    }
  } else if (sUrl.substr(0, 6) == "/tail/") {
    std::string sName = sUrl.substr(6, sUrl.size() - 6);
    Watcher::Info * pInfo = GWatcher.Get(sName);
    if (pInfo) {
      Json::Value iMsg;
      iMsg["action"]	= "tail_sync";
      iMsg["data"]	= pInfo->pTail; 
      Broadcast(sUrl, iMsg.toStyledString());
    }
  }
}

void Supervisor::ResponseApi(mg_connection * pConn, http_message * pMsg) {
  std::string sUrl(pMsg->uri.p, pMsg->uri.len);
  auto it = _mApis.find(sUrl);

  if (it == _mApis.end()) {
    Response(pConn, 403, HEADER_HTML, MSG_FORBIDDEN);
    return;
  }

  if (it->second.emMethod == EMethod::GET) {
    char * pBuf		= new char[pMsg->query_string.len + 1];
    size_t nSize	= mg_url_decode(pMsg->query_string.p, pMsg->query_string.len, pBuf, pMsg->query_string.len + 1, true);

    Json::Value iParam = Url2Json(pBuf, nSize);
    delete[] pBuf;

    (this->*(it->second.fProc))(pConn, iParam);
  } else {
    char * pBuf		= new char[pMsg->body.len + 1];
    size_t nSize	= mg_url_decode(pMsg->body.p, pMsg->body.len, pBuf, pMsg->body.len + 1, true);

    Json::Value iParam = Url2Json(pBuf, nSize);
    delete[] pBuf;

    (this->*(it->second.fProc))(pConn, iParam);
  }
}

void Supervisor::ResponseNeedAuth(mg_connection * pConn) {
  std::string sHeader;
  char pMD5[33] = {0};

  sHeader.append("WWW-Authenticate: Digest realm=\"supervisor\",");
  sHeader.append("qop=\"auth\",");
  sHeader.append("nonce=\"");
  sHeader.append(std::to_string((uint32_t)time(NULL)));
  sHeader.append("\",opaque=\"");
  sHeader.append(cs_md5(pMD5, "supervisor", 10, NULL));
  sHeader.append("\"");

  Response(pConn, 401, sHeader, "");
}

void Supervisor::Reload(mg_connection * pConn, const Json::Value & rParam) {
  LoadSetting();
  Response(pConn, 200, HEADER_JSON, MSG_SUCCESS);
}

void Supervisor::AddWatcher(mg_connection * pConn, const Json::Value & rParam) {
  if (rParam["name"].isNull() || rParam["cmd"].isNull()) {
    Response(pConn, 200, HEADER_JSON, MSG_BADPARAM);
    return;
  }

  std::string sName = rParam["name"].asString();
  if (!GetConf(sName).isNull()) {
    Response(pConn, 200, HEADER_JSON, MSG_BADNAME);
    return;
  }

  Json::Value iProgram;
  iProgram["name"]	= rParam["name"];
  iProgram["cmd"]		= rParam["cmd"];
  iProgram["dir"]		= rParam["dir"].isNull() ? "/" : rParam["dir"];
  iProgram["retry"]	= rParam["retry"].isNull() ? 0 : atoi(rParam["retry"].asCString());

  if (_iConf["watcher"].isNull()) _iConf["watcher"] = Json::Value(Json::arrayValue);
  _iConf["watcher"].append(iProgram);
  SaveSetting();

  Json::Value iMsg;
  iMsg["action"]				= "add";
  iMsg["watcher"]				= iProgram;
  iMsg["watcher"]["status"]	= Watcher::EStatus::Stopped;

  Broadcast("/ws", iMsg.toStyledString());
  Response(pConn, 200, HEADER_JSON, MSG_SUCCESS);
}

void Supervisor::EditWatcher(mg_connection * pConn, const Json::Value & rParam) {
  if (rParam["org-name"].isNull() || rParam["name"].isNull() || rParam["cmd"].isNull()) {
    Response(pConn, 200, HEADER_JSON, MSG_BADPARAM);
    return;
  }

  std::string sOldName = rParam["org-name"].asString();
  std::string sNewName = rParam["name"].asString();

  Watcher::Info * pStatus = GWatcher.Get(sOldName);
  if (pStatus && pStatus->emStatus == Watcher::EStatus::Running) {
    Response(pConn, 200, HEADER_JSON, MSG_NEEDSTOP);
    return;
  }

  Json::Value iFind = Json::Value::null;
  int nIdx = 0;

  for (int i = 0; i < (int)_iConf["watcher"].size(); ++i) {
    std::string sFind = _iConf["watcher"][i]["name"].asString();
    if (sFind == sOldName) {
      iFind	= _iConf["watcher"][i];
      nIdx	= i;
    } else if (sFind == sNewName) {
      Response(pConn, 200, HEADER_JSON, MSG_BADNAME);
      return;
    }
  }

  if (iFind.isNull()) {
    Response(pConn, 200, HEADER_JSON, MSG_BADPARAM);
    return;
  }

  iFind["name"]	= sNewName;
  iFind["cmd"]	= rParam["cmd"];
  iFind["dir"]	= rParam["dir"].isNull() ? "/" : rParam["dir"];
  iFind["retry"]	= rParam["retry"].isNull() ? 0 : atoi(rParam["retry"].asCString());

  Json::Value iUnused;
  _iConf["watcher"][nIdx] = iFind;
  GWatcher.Remove(sOldName);
  SaveSetting();

  Json::Value iMsg;
  iMsg["action"]				= "add";
  iMsg["watcher"]				= iFind;
  iMsg["watcher"]["status"]	= Watcher::EStatus::Stopped;

  if (sNewName != sOldName) iMsg["rename"] = sOldName;
  Broadcast("/ws", iMsg.toStyledString());
  Response(pConn, 200, HEADER_JSON, MSG_SUCCESS);
}

void Supervisor::StartWatcher(mg_connection * pConn, const Json::Value & rParam) {
  if (rParam["name"].isNull()) {
    Response(pConn, 200, HEADER_JSON, MSG_BADPARAM);
    return;
  }

  std::string sName	= rParam["name"].asString();
  Json::Value iProg	= GetConf(sName);
  if (iProg.isNull()) {
    Response(pConn, 200, HEADER_JSON, MSG_BADPARAM);
    return;
  }

  std::string sCmd	= iProg["cmd"].asString();
  std::string sDir	= iProg["dir"].asString();
  int nRetry			= iProg["retry"].asInt();

  GWatcher.Start(sName, sDir, sCmd, nRetry);
  Response(pConn, 200, HEADER_JSON, MSG_SUCCESS);
}

void Supervisor::StopWatcher(mg_connection * pConn, const Json::Value & rParam) {
  if (rParam["name"].isNull()) {
    Response(pConn, 200, HEADER_JSON, MSG_BADPARAM);
    return;
  }

  std::string sName	= rParam["name"].asString();
  Json::Value iProg	= GetConf(sName);
  if (iProg.isNull()) {
    Response(pConn, 200, HEADER_JSON, MSG_BADPARAM);
    return;
  }

  GWatcher.Stop(sName);
  Response(pConn, 200, HEADER_JSON, MSG_SUCCESS);
}

void Supervisor::DeleteWatcher(mg_connection * pConn, const Json::Value & rParam) {
  if (rParam["name"].isNull()) {
    Response(pConn, 200, HEADER_JSON, MSG_BADPARAM);
    return;
  }

  std::string sName = rParam["name"].asString();

  Watcher::Info * pStatus = GWatcher.Get(sName);
  if (pStatus && pStatus->emStatus == Watcher::EStatus::Running) {
    Response(pConn, 200, HEADER_JSON, MSG_NEEDSTOP);
    return;
  }

  for (int i = 0; i < (int)_iConf["watcher"].size(); ++i) {
    std::string sFind = _iConf["watcher"][i]["name"].asString();
    if (sFind == sName) {
      Json::Value iUnused;
      _iConf["watcher"].removeIndex(i, &iUnused);
    }
  }

  GWatcher.Remove(sName);
  SaveSetting();

  Json::Value iMsg;
  iMsg["action"]	= "delete";
  iMsg["watcher"]	= sName;

  Broadcast("/ws", iMsg.toStyledString());
  Response(pConn, 200, HEADER_JSON, MSG_SUCCESS);
}

bool Supervisor::LoadSetting() {
  std::ifstream	ifs("supervisor.json");
  Json::Reader	iReader;
  Json::Value		iTemp;

  if (!iReader.parse(ifs, iTemp, false)) {
    LOG_ERR("Load configuration failed : %s", iReader.getFormattedErrorMessages().c_str());
    return false;
  }

  if (_pAuth) {
    fclose(_pAuth);
    _pAuth = NULL;
  }

  int nUsers = iTemp["user"].size();
  if (nUsers > 0) {
    _pAuth = tmpfile();
    char * pBuf = new char[256];
    char * pMD5 = new char[33];

    for (int i = 0; i < nUsers; ++i) {
      Json::Value & r 		= iTemp["user"][i];
      std::string sAccount	= r["account"].asString();
      std::string sPswd		= r["pswd"].asString();
      std::string sData		= sAccount + ":supervisor:" + sPswd;
      char * pH1 				= cs_md5(pMD5, sData.data(), sData.size(), NULL);

      snprintf(pBuf, 256, "%s:supervisor:%s\n", sAccount.data(), pH1);
      fprintf(_pAuth, "%s", pBuf);
    }

    delete[] pBuf;
    delete[] pMD5;
  }

  _iConf = iTemp;
  return true;
}

void Supervisor::SaveSetting() {
  std::ofstream ofs("supervisor.json");
  Json::StyledWriter iWriter;

  ofs << iWriter.write(_iConf) << std::endl;
  ofs.flush();
  ofs.close();
}

void Supervisor::Register(const std::string & sUrl, EMethod emMethod, Proc fProc) {
  if (_mApis.find(sUrl) != _mApis.end()) return;

  Processor iProc;
  iProc.emMethod	= emMethod;
  iProc.fProc		= fProc;
  _mApis[sUrl]	= iProc;
}

Json::Value Supervisor::Url2Json(const char * pData, size_t nSize) {
  std::string sUrl(pData, nSize);
  Json::Value iParam;

  size_t nStart	= -1;
  size_t nEnd		= sUrl.find_first_of('=');

  if (nEnd == std::string::npos || nEnd == nSize - 1 || nEnd <= 0)
    return std::move(iParam);

  do {
    size_t nNext = sUrl.find_first_of('&', nEnd + 1);
    if (nNext == std::string::npos) nNext = nSize;

    std::string sKey = sUrl.substr(nStart + 1, nEnd - nStart - 1);
    std::string sVal = sUrl.substr(nEnd + 1, nNext - nEnd - 1);

    if (!sVal.empty()) iParam[sKey] = sVal;

    if (nNext == nSize) break;

    nStart	= nNext;
    nEnd	= sUrl.find_first_of('=', nStart);
  } while (nStart < nEnd && nEnd != std::string::npos && nEnd < nSize - 1);

  return std::move(iParam);
}

Json::Value Supervisor::GetWatcherInfo(const std::string & sName) {
  Watcher::Info *	pInfo = GWatcher.Get(sName);
  Json::Value		iInfo = GetConf(sName);

  iInfo["name"]	= sName;
  iInfo["status"]	= pInfo ? pInfo->emStatus : Watcher::EStatus::Stopped;

  return std::move(iInfo);
}

Json::Value Supervisor::GetConf(const std::string & sName) {
  int nWatcher = _iConf["watcher"].size();

  for (int n = 0; n < nWatcher; ++n) {
    Json::Value iWatcher = _iConf["watcher"][n];
    if (iWatcher["name"].asString() == sName) return iWatcher;
  }

  return Json::Value::null;
}

void Supervisor::Response(mg_connection * pConn, int nHttpCode, const std::string & sHeader, const std::string & sContent) {
  mg_send_head(pConn, nHttpCode, sContent.size(), sHeader.empty() ? "Content-type:text/plain" : sHeader.data());
  if (!sContent.empty()) mg_send(pConn, sContent.data(), sContent.size());
  pConn->flags |= MG_F_SEND_AND_CLOSE;
}

void Supervisor::Broadcast(const std::string & sScope, const std::string & sJson) {
  for (mg_connection * p : _vSockets) {
    char * pScope = (char *)p->user_data;
    if (sScope == pScope) mg_send_websocket_frame(p, WEBSOCKET_OP_TEXT, sJson.data(), sJson.size());
  }
}

int main(int nArgc, char * pArgv[]) {
  Command iCmd(nArgc, pArgv);
  Supervisor iService;
  iService.Start(iCmd);
  return 0;	
}
